/////////////////////////////////////////////////////////////////////////////
// Original code from libhdfs3. Copyright (c) 2013 - 2014, Pivotal Inc.
// All rights reserved. Author: Zhanwei Wang
/////////////////////////////////////////////////////////////////////////////
//  Modifications by Kumo Inc.
// Copyright (C) Kumo inc. and its affiliates.
// Author: Jeff.li lijippy@163.com
// All rights reserved.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <https://www.gnu.org/licenses/>.
//



#include <memory>
#include <kmhdfs/proto/protobuf_rpc_engine.pb.h>
#include <kmhdfs/rpc/rpc_call.h>
#include <kmhdfs/rpc/rpc_content_wrapper.h>
#include <kmhdfs/proto/rpc_header.pb.h>
#include <kmhdfs/rpc/rpc_remote_call.h>
#include <kmhdfs/common/write_buffer.h>

#include <google/protobuf/io/coded_stream.h>

#define PING_CALL_ID -4

using namespace google::protobuf::io;

namespace Hdfs {
namespace Internal {

void RpcRemoteCall::serialize(const RpcProtocolInfo & protocol,
                              WriteBuffer & buffer) {
    RpcRequestHeaderProto rpcHeader;
    rpcHeader.set_callid(identity);
    rpcHeader.set_clientid(clientId);
    rpcHeader.set_retrycount(-1);
    rpcHeader.set_rpckind(RPC_PROTOCOL_BUFFER);
    rpcHeader.set_rpcop(RpcRequestHeaderProto_OperationProto_RPC_FINAL_PACKET);
    RequestHeaderProto requestHeader;
    requestHeader.set_methodname(call.getName());
    requestHeader.set_declaringclassprotocolname(protocol.getProtocol());
    requestHeader.set_clientprotocolversion(protocol.getVersion());
    RpcContentWrapper wrapper(&requestHeader, call.getRequest());
    int rpcHeaderLen = rpcHeader.ByteSizeLong();
    int size = CodedOutputStream::VarintSize32(rpcHeaderLen) + rpcHeaderLen + wrapper.getLength();
    buffer.writeBigEndian(size);
    buffer.writeVarint32(rpcHeaderLen);
    rpcHeader.SerializeToArray(buffer.alloc(rpcHeaderLen), rpcHeaderLen);
    wrapper.writeTo(buffer);
}

std::vector<char> RpcRemoteCall::GetPingRequest(const std::string & clientid) {
    WriteBuffer buffer;
    std::vector<char> retval;
    RpcRequestHeaderProto pingHeader;
    pingHeader.set_callid(PING_CALL_ID);
    pingHeader.set_clientid(clientid);
    pingHeader.set_retrycount(INVALID_RETRY_COUNT);
    pingHeader.set_rpckind(RpcKindProto::RPC_PROTOCOL_BUFFER);
    pingHeader.set_rpcop(RpcRequestHeaderProto_OperationProto_RPC_FINAL_PACKET);
    int rpcHeaderLen = pingHeader.ByteSizeLong();
    int size = CodedOutputStream::VarintSize32(rpcHeaderLen) + rpcHeaderLen;
    buffer.writeBigEndian(size);
    buffer.writeVarint32(rpcHeaderLen);
    pingHeader.SerializeWithCachedSizesToArray(reinterpret_cast<unsigned char *>(buffer.alloc(pingHeader.ByteSize())));
    retval.resize(buffer.getDataSize(0));
    memcpy(retval.data(), buffer.getBuffer(0), retval.size());
    return retval;
}

}
}

